🕺Kapoeira💃

kapoeira

Presentation

kara
Mehdi Rebiai

Presentation

odile
Johanna Vauchel

Take Away 🎁

  • Discover a new tool to test your kafka streams

  • Help you in your communication with PO/QA/DEV

  • Tips to use it every day

  • Have a good time (we hope)

🎬 Kapoeira story 📽️

Enrich and collect data

enrichData

We are perfect !

perfect

Data is perfect !

pipeline example

NO !

pipeline example poo

Solution ?

TESTS OUR STREAMS!

How to test ?

fast

Fast and efficient…​

Scala Test Example

package com.lectra.kafka.stream.example

import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.streams._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, GivenWhenThen}

import java.io.File
import java.util.UUID

class KafkaStreamSelectKeyTest extends AnyFlatSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with GivenWhenThen {

  private val stringSerializer = new StringSerializer()
  private val stringDeserializer = new StringDeserializer()

  private var driver: TopologyTestDriver = _
  private var inputTopic: TestInputTopic[String, String] = _
  private var outputTopic: TestOutputTopic[String, String] = _

  private def tempDir: File = {
    val ioDir = System.getProperty("java.io.tmpdir")
    val f = new File(ioDir, "kafka-" + UUID.randomUUID().toString)
    f.mkdirs()
    f.deleteOnExit()
    f
  }

  private def buildTopology(): Topology = {
    import org.apache.kafka.streams.scala.StreamsBuilder
    val builder = new StreamsBuilder
    KafkaStreamSelectKey.topology(builder)
    builder.build()
  }

  override def beforeEach(): Unit = {
    KafkaStreamAvro.config.put(StreamsConfig.STATE_DIR_CONFIG, tempDir.getAbsolutePath)
    driver = new TopologyTestDriver(buildTopology(), KafkaStreamSelectKey.config)
    inputTopic = driver.createInputTopic(KafkaStreamSelectKey.topicIn, stringSerializer, stringSerializer)
    outputTopic = driver.createOutputTopic(KafkaStreamSelectKey.topicChangedKey, stringDeserializer, stringDeserializer)
  }

  override def afterEach(): Unit = {
    driver.close()
  }


  "Nominal case for select" should "change the key of records by combining key and value with -" in {
    val key = "mykey"
    val value = "myvalue"
    val key2 = "yourkey"
    val value2 = "yourvalue"

    inputTopic.pipeInput(key, value)
    inputTopic.pipeInput(key2, value2)
    val expectedKey1 = s"$key-$value"
    val expectedKey2 = s"$key2-$value2"

    outputTopic.getQueueSize shouldBe 2
    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey1, value)
    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey2, value2)

  }


}

…​ But it’s mock

fail

it did not test the integration with the Kafka cluster

…​ But unitary

end to end

how to test several streams ?

…​ But it’s not for QA

les nuls police

Listen, let’s DEV do their job !

…​ But it’s code

simon kara

how to communicate with DEV/PO/QA ?

What do we need ?

test pyramid cesar triangle

Integration tests with simple syntax

What do we need ?

kafkalogo

Integration with Kafka Streams

Inspiration : Karate

karate

  • HTTP-based APIs

  • Peter Thomas - 2017

  • simple syntax (Gherkin)

2020 - Birth of Kapoeira

  • Inner Source

  • Uses command lines developed by Confluent

# Console producer
kafka-console-producer \
  --topic orders \
  --bootstrap-server broker:9092 \

# Console consumer
kafka-console-consumer \
  --topic orders \
  --bootstrap-server broker:9092 \
  --from-beginning \

2021 - ZIO

zio

To improve perfs and add parallel mode ?

2023 - Open Source

2024 - New features

Thanks to you !

Inside Kapoeira

Cucumber Scala, using specific Gherkin DSL.

cucumber

How does it work ?

archi

How does it work ?

kapoeira diagram

Example

Feature: upper-case
  Background:
    Given input topic
      | topic              | alias    | key_type | value_type |
      | topic-simple-value | topic_in | string   | string     | (1)

    And output topic
      | topic                   | alias            | key_type | value_type | readTimeoutInSecond |
      | topic-simple-value      | topic_string_out | string   | string     | 5                   |
      | topic-upper-case-string | topic_out        | string   | string     | 5                   |  (2)
    And var myKey = call function: uuid

  Scenario: My first scenario
    When records with key and value are sent (3)
      | topic_alias | key      | value |
      | topic_in    | ${myKey} | a     |
      | topic_in    | ${myKey} | b     |
      | topic_in    | ${myKey} | c     |
    Then expected records                    (4)
      | topic_alias      | key      | value    |
      | topic_string_out | ${myKey} | input_1  |
      | topic_string_out | ${myKey} | input_2  |
      | topic_string_out | ${myKey} | input_3  |
      | topic_out        | ${myKey} | result_1 |
      | topic_out        | ${myKey} | result_2 |
      | topic_out        | ${myKey} | result_3 |
    And assert input_1 $ == "a"             (5)
    And assert input_2 $ == "b"
    And assert input_3 $ == "c"

    And assert result_1 $ == "A"
    And assert result_2 $ == "B"
    And assert result_3 $ == "C"

Report

report

Démo

buger quiz

Architecture

burger quiz

Docker commands

docker compose build --no-cache
docker compose up -d
docker restart kapoeira

burger.feature

Feature: Burger 🍔 feature

  Background:
    Given input topic
      | topic     | alias        | key_type | value_type |
      | bread     | bread-in     | string   | string     |
      | vegetable | vegetable-in | string   | string     |
      | meat      | meat-in      | string   | string     |
    And output topic
      | topic  | alias      | key_type | value_type | readTimeoutInSecond |
      | burger | burger-out | string   | string     | 5                   |
    And var uuid = call function: uuid

  Scenario: Nominal
    When records with key and value are sent
      | topic_alias  | key        | value |
      | bread-in     | 🤤_${uuid} | 🍞    |
      | vegetable-in | 🤤_${uuid} | 🍅    |
      | meat-in      | 🤤_${uuid} | 🥩    |
    Then expected records
      | topic_alias | key        | value |
      | burger-out  | 🤤_${uuid} | order |
    And assert order $ == "🍔"

  Scenario: Not a burger
    When records with key and value are sent
      | topic_alias  | key        | value |
      | bread-in     | 🤤_${uuid} | 🍞    |
      | vegetable-in | 🤤_${uuid} | 🥕    |
      | meat-in      | 🤤_${uuid} | 🥩    |
    Then expected records
      | topic_alias | key        | value |
      | burger-out  | 🤤_${uuid} | order |
    And assert order $ == "🍞 + 🥕 + 🥩"

  Scenario Outline: Many customers
    When records with key and value are sent
      | topic_alias  | key            | value       |
      | bread-in     | <user>_${uuid} | <bread>     |
      | vegetable-in | <user>_${uuid} | <vegetable> |
      | meat-in      | <user>_${uuid} | <meat>      |
    Then expected records
      | topic_alias | key            | value |
      | burger-out  | <user>_${uuid} | order |
    And assert order $ == "<result>"

    Examples:
      | user | bread | vegetable | meat | result |
      | 🤤   | 🍞    | 🍅        | 🥩   | 🍔     |
      | 😋   | 🍞    | 🍅        | 🍗   | 🍔     |
      | 😡   | 🍞    | 🍅        | 🐟   | 🍔     |

meal.feature

Feature: Meal 🛍 feature

  Background:
    Given input topic
      | topic       | alias          | key_type | value_type |
      | bread       | bread-in       | string   | string     |
      | vegetable   | vegetable-in   | string   | string     |
      | meat        | meat-in        | string   | string     |
      | side-dishes | side-dishes-in | string   | string     |
    And output topic
      | topic | alias    | key_type | value_type | readTimeoutInSecond |
      | meal  | meal-out | string   | string     | 20                  |
    And var uuid = call function: uuid

  Scenario: Left Join with Left first
    When records with key and value are sent
      | topic_alias    | key        | value |
      | bread-in       | 🤤_${uuid} | 🍞    |
      | vegetable-in   | 🤤_${uuid} | 🍅    |
      | meat-in        | 🤤_${uuid} | 🥩    |
      | side-dishes-in | 🤤_${uuid} | 🥔🍺  |
    Then expected records
      | topic_alias | key        | value |
      | meal-out    | 🤤_${uuid} | notif |
      | meal-out    | 🤤_${uuid} | order |
    And assert notif $ == "🍔"
    And assert order $ == "🛍(🍔 + 🍟🍺)"

  Scenario: Left Join with Right first
    When records with key and value are sent
      | topic_alias    | key        | value |
      | side-dishes-in | 🤤_${uuid} | 🥔🍷  |
      | bread-in       | 🤤_${uuid} | 🍞    |
      | vegetable-in   | 🤤_${uuid} | 🍅    |
      | meat-in        | 🤤_${uuid} | 🥩    |
    Then expected records
      | topic_alias | key        | value |
      | meal-out    | 🤤_${uuid} | order |
    And assert order $ == "🛍(🍔 + 🍟🍷)"

REX

  • 👐 Big community in Lectra

  • ✏️ Easy for QA to enrich existing tests

  • 🤝 Used as acceptance tests, specifications during story grooming

  • 🔄 Used as end-to-end tests

Advantages

advantages

  • Kafka infra

  • Simple to use

  • Communicate with PO/QA/DEV

  • tests as documentation

  • tests as acceptance for stories

Want to use it ?

banco

How to build ?

docker build -t kapoeira:latest .

How to use ?

docker run --rm -ti \
-v <PATH_TO_YOUR_FEATURES_FOLDER>:/features \
-v /var/run/docker.sock:/var/run/docker.sock \
-e KAFKA_BOOTSTRAP_SERVER=<HOST:PORT[,HOST2:PORT2,HOST3:PORT3,...]> \
-e KAFKA_SCHEMA_REGISTRY_URL=<URL> \
-e KAFKA_USER=<XXX> \
-e KAFKA_PASSWORD=<****> \
-e JAAS_AUTHENT=<true (default) | false> \
-e LOGGING_LEVEL=<INFO (default) | ERROR | ...> \
-e THREADS=<8 (default) | ... > \
lectratech/kapoeira

How to contribute ?

TODO